package com.xx.dk.tbsalling.aismessages.ais.messages.handler;

import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.xx.common.constant.Constants;
import com.xx.dk.tbsalling.aismessages.ais.messages.ExtendedDynamicDataReport;
import com.xx.dk.tbsalling.aismessages.ais.messages.PositionReport;
import com.xx.dk.tbsalling.aismessages.utils.LimitedSizeMap;
import com.xx.domain.ShipBaseInfo;
import com.xx.domain.Track;
import com.xx.netty.server.constants.AisConstants;
import com.xx.service.RuleCalcService;
import com.xx.util.AisToTrackUtil;
import com.xx.utils.IdWorker;
import com.xx.utils.redis.RedisUtil;
import com.xx.web.domain.MgTrack;
import com.xx.web.domain.entity.ZlShip;
import com.xx.web.mapper.ZlShipMapper;
import com.xx.web.proto.ProtoBufUtil;
import com.xx.web.proto.ShipOuterClass;
import com.xx.zmq.ZmqServer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @description:
 * @author: xx
 * @Date 2023/7/20 9:19
 * @version: 1.0
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class DynamicDataReportHandler {
    private final RedisUtil redisUtil;

    private final RuleCalcService ruleCalcService;

    private final ThreadPoolTaskExecutor aisThreadPool;
    private final ZlShipMapper shipMapper;

    private final IdWorker idWorker;

    //public static LimitedSizeQueue<ShipOuterClass.Ship> queue = new LimitedSizeQueue<>(1000);
    public static LimitedSizeMap<Integer, ShipOuterClass.Ship> queue = new LimitedSizeMap<>(1000);
    public static List<ShipOuterClass.Ship> ships = new CopyOnWriteArrayList<>();

    public static List<MgTrack> trackList = new CopyOnWriteArrayList<>();

    private static final ExecutorService zmqExecutor = Executors.newSingleThreadExecutor();


    public void positionReportDispose(PositionReport data) {
        MgTrack mgTrack = new MgTrack();
        AisToTrackUtil.setTrack(data, mgTrack);
        Integer mmsi = data.getSourceMmsi().getMMSI();
        if (mmsi == 0) {
            return;
        }
        ShipBaseInfo baseInfo = redisUtil.get(AisConstants.AIS_SHIP_INFO + mmsi);
        assembledTrack(mgTrack, mmsi, baseInfo);
    }

    public String getTrackKey(Integer mmsId) {
        return Constants.SHIP_TRACK_KEY + LocalDate.now() + ":" + mmsId;
    }

    public void extendedDynamicDataReportHandler(ExtendedDynamicDataReport report) {
        MgTrack mgTrack = new MgTrack();
        Integer mmsi = AisToTrackUtil.setTrack(report, mgTrack);
        if (mmsi == 0) {
            return;
        }
        ShipBaseInfo baseInfo = redisUtil.get(AisConstants.AIS_SHIP_INFO + mmsi);
        assembledTrack(mgTrack, mmsi, baseInfo);
    }

    private void assembledTrack(MgTrack mgTrack, Integer mmsi, ShipBaseInfo baseInfo) {
        if (baseInfo != null) {
            //push track
            if (ships.size() < 200) {
                aisThreadPool.execute(() -> {
                    Track track;
                    ShipBaseInfo baseInfo2;
                    if (baseInfo == null) {
                        ZlShip zlShip =
                                shipMapper.selectOne(Wrappers.<ZlShip>lambdaQuery().select(ZlShip::getShipNameEn,
                                        ZlShip::getLocalName, ZlShip::getFlagCode, ZlShip::getLoa, ZlShip::getBm).eq(ZlShip::getMmsi, mmsi).last("limit 1"));
                        baseInfo2 = new ShipBaseInfo();
                        if (zlShip != null) {
                            baseInfo2.setZlShip(zlShip);
                            baseInfo2.setName(zlShip.getShipNameEn());
                        }
                        baseInfo2.setMmsi(mmsi);
                        track = new Track(baseInfo2);
                        redisUtil.set(AisConstants.AIS_SHIP_INFO + mmsi, baseInfo2, 86400 * 7L);
                    } else {
                        track = new Track(baseInfo);
                    }

                    track.setTrack(mgTrack);
                    ShipOuterClass.Ship.Builder builder = ShipOuterClass.Ship.newBuilder();
                    ProtoBufUtil.transformProtoReturnBuilder(builder, track);
                    ShipOuterClass.Ship ship = builder.build();
                    ships.add(ship);
                    zmqExecutor.execute(() -> {
                        ZmqServer.sendData("aship", ship.toByteArray());
                    });
                    synchronized (DynamicDataReportHandler.queue.lock) {
                        queue.put(ship.getMmsi(), ship);
                    }
                    mgTrack.setId(idWorker.nextIdStr());
                    trackList.add(mgTrack);
                    redisUtil.zAddStr2(getTrackKey(mmsi), JSONObject.toJSONString(mgTrack), ship.getTime());
                    ruleCalcService.calcWarning(ship);
                });
            }
        }
    }

}
